package com.at.sink10;

import com.at.bean.WaterSensor;
import com.at.functions5.WaterSensorMapFunction3;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/**
 * @author huangchao E-mail:fengquan8866@163.com
 * @version 创建时间：2024/9/25 20:21
 */
public class SinkMysql4 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("mysql01", 7777)
                .map(new WaterSensorMapFunction3());

        /**
         * TODO 写入mysql
         * 1、只能用老的sink写法：addSink
         * 2、JDBCSink的4个参数：
         *   Param1：执行的sql，一般就是 insert into
         *   Param2：预编译sql，对占位符填充值
         *   Param3：执行选项 --》 重试
         *   Param4：连接配置，连接信息
         */
        SinkFunction<WaterSensor> mysqlSink = JdbcSink.sink(
                "insert into ws values(?,?,?)",
                (JdbcStatementBuilder<WaterSensor>) (preparedStatement, waterSensor) -> {
                    preparedStatement.setString(1, waterSensor.getId());
                    preparedStatement.setLong(2, waterSensor.getTs());
                    preparedStatement.setInt(3, waterSensor.getVc());
                },
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3)
                        .withBatchSize(100)
                        .withBatchIntervalMs(3000)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://mysql01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("Password")
                        .withConnectionCheckTimeoutSeconds(60)
                        .build()
        );

        sensorDS.addSink(mysqlSink);

        env.execute();
    }
}
